{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)\n",
    "\n",
    "# 6 - Amazon Athena\n",
    "\n",
    "[awswrangler](https://github.com/aws/aws-sdk-pandas) has three ways to run queries on Athena and fetch the result as a DataFrame:\n",
    "\n",
    "- **ctas_approach=True** (Default)\n",
    "\n",
    "    Wraps the query with a CTAS and then reads the table data as parquet directly from s3.\n",
    "    \n",
    "    * `PROS`:\n",
    "        - Faster for mid and big result sizes.\n",
    "        - Can handle some level of nested types.\n",
    "    * `CONS`:\n",
    "         - Requires create/delete table permissions on Glue.\n",
    "         - Does not support timestamp with time zone\n",
    "         - Does not support columns with repeated names.\n",
    "         - Does not support columns with undefined data types.\n",
    "         - A temporary table will be created and then deleted immediately.\n",
    "         - Does not support custom data_source/catalog_id.\n",
    "\n",
    "- **unload_approach=True and ctas_approach=False**\n",
    "\n",
    "    Does an UNLOAD query on Athena and parse the Parquet result on s3.\n",
    "\n",
    "    * `PROS`:\n",
    "        - Faster for mid and big result sizes.\n",
    "        - Can handle some level of nested types.\n",
    "        - Does not modify Glue Data Catalog.\n",
    "    * `CONS`:\n",
    "        - Output S3 path must be empty.\n",
    "        - Does not support timestamp with time zone\n",
    "        - Does not support columns with repeated names.\n",
    "        - Does not support columns with undefined data types.\n",
    "\n",
    "- **ctas_approach=False**\n",
    "\n",
    "    Does a regular query on Athena and parse the regular CSV result on s3.\n",
    "    \n",
    "    * `PROS`:\n",
    "        - Faster for small result sizes (less latency).\n",
    "        - Does not require create/delete table permissions on Glue\n",
    "        - Supports timestamp with time zone.\n",
    "        - Support custom data_source/catalog_id.\n",
    "    * `CONS`:\n",
    "        - Slower (But stills faster than other libraries that uses the regular Athena API)\n",
    "        - Does not handle nested types at all."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import awswrangler as wr"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Enter your bucket name:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import getpass\n",
    "\n",
    "bucket = getpass.getpass()\n",
    "path = f\"s3://{bucket}/data/\""
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Checking/Creating Glue Catalog Databases"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "if \"awswrangler_test\" not in wr.catalog.databases().values:\n",
    "    wr.catalog.create_database(\"awswrangler_test\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Creating a Parquet Table from the NOAA's CSV files\n",
    "\n",
    "[Reference](https://registry.opendata.aws/noaa-ghcn/)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "is_executing": true
    }
   },
   "outputs": [],
   "source": [
    "cols = [\"id\", \"dt\", \"element\", \"value\", \"m_flag\", \"q_flag\", \"s_flag\", \"obs_time\"]\n",
    "\n",
    "df = wr.s3.read_csv(\n",
    "    path=\"s3://noaa-ghcn-pds/csv/by_year/189\", names=cols, parse_dates=[\"dt\", \"obs_time\"]\n",
    ")  # Read 10 files from the 1890 decade (~1GB)\n",
    "\n",
    "df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "is_executing": true
    }
   },
   "outputs": [],
   "source": [
    "wr.s3.to_parquet(df=df, path=path, dataset=True, mode=\"overwrite\", database=\"awswrangler_test\", table=\"noaa\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "is_executing": true
    }
   },
   "outputs": [],
   "source": [
    "wr.catalog.table(database=\"awswrangler_test\", table=\"noaa\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Reading with ctas_approach=False"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "is_executing": true
    }
   },
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "wr.athena.read_sql_query(\"SELECT * FROM noaa\", database=\"awswrangler_test\", ctas_approach=False)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Default with ctas_approach=True - 13x faster (default)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "is_executing": true
    }
   },
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "wr.athena.read_sql_query(\"SELECT * FROM noaa\", database=\"awswrangler_test\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Using categories to speed up and save memory - 24x faster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "is_executing": true
    }
   },
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "wr.athena.read_sql_query(\n",
    "    \"SELECT * FROM noaa\",\n",
    "    database=\"awswrangler_test\",\n",
    "    categories=[\"id\", \"dt\", \"element\", \"value\", \"m_flag\", \"q_flag\", \"s_flag\", \"obs_time\"],\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Reading with unload_approach=True"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "is_executing": true,
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "wr.athena.read_sql_query(\n",
    "    \"SELECT * FROM noaa\",\n",
    "    database=\"awswrangler_test\",\n",
    "    ctas_approach=False,\n",
    "    unload_approach=True,\n",
    "    s3_output=f\"s3://{bucket}/unload/\",\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Batching (Good for restricted memory environments)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "is_executing": true
    }
   },
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "dfs = wr.athena.read_sql_query(\n",
    "    \"SELECT * FROM noaa\",\n",
    "    database=\"awswrangler_test\",\n",
    "    chunksize=True,  # Chunksize calculated automatically for ctas_approach.\n",
    ")\n",
    "\n",
    "for df in dfs:  # Batching\n",
    "    print(len(df.index))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "is_executing": true
    }
   },
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "dfs = wr.athena.read_sql_query(\"SELECT * FROM noaa\", database=\"awswrangler_test\", chunksize=100_000_000)\n",
    "\n",
    "for df in dfs:  # Batching\n",
    "    print(len(df.index))"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Parameterized queries"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Client-side parameter resolution\n",
    "\n",
    "The `params` parameter allows client-side resolution of parameters, which are specified with `:col_name`, when `paramstyle` is set to `named`.\n",
    "Additionally, Python types will map to the appropriate Athena definitions.\n",
    "For example, the value `dt.date(2023, 1, 1)` will resolve to `DATE '2023-01-01`.\n",
    "\n",
    "For the example below, the following query will be sent to Athena:\n",
    "```sql\n",
    "SELECT * FROM noaa WHERE S_FLAG = 'E'\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "wr.athena.read_sql_query(\n",
    "    \"SELECT * FROM noaa WHERE S_FLAG = :flag_value\",\n",
    "    database=\"awswrangler_test\",\n",
    "    params={\n",
    "        \"flag_value\": \"E\",\n",
    "    },\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Server-side parameter resolution\n",
    "\n",
    "Alternatively, Athena supports server-side parameter resolution when `paramstyle` is defined as `qmark`.\n",
    "The SQL statement sent to Athena will not contain the values passed in `params`.\n",
    "Instead, they will be passed as part of a separate `params` parameter in `boto3`.\n",
    "\n",
    "The downside of using this approach is that types aren't automatically resolved.\n",
    "The values sent to `params` must be strings.\n",
    "Therefore, if one of the values is a date, the value passed in `params` has to be `DATE 'XXXX-XX-XX'`.\n",
    "\n",
    "The upside, however, is that these parameters can be used with prepared statements.\n",
    "\n",
    "For more information, see \"[Using parameterized queries](https://docs.aws.amazon.com/athena/latest/ug/querying-with-prepared-statements.html)\"."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "wr.athena.read_sql_query(\n",
    "    \"SELECT * FROM noaa WHERE S_FLAG = ?\",\n",
    "    database=\"awswrangler_test\",\n",
    "    params=[\"E\"],\n",
    "    paramstyle=\"qmark\",\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prepared statements"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "wr.athena.create_prepared_statement(\n",
    "    sql=\"SELECT * FROM noaa WHERE S_FLAG = ?\",\n",
    "    statement_name=\"statement\",\n",
    ")\n",
    "\n",
    "# Resolve parameter using Athena execution parameters\n",
    "wr.athena.read_sql_query(\n",
    "    sql=\"EXECUTE statement\",\n",
    "    database=\"awswrangler_test\",\n",
    "    params=[\"E\"],\n",
    "    paramstyle=\"qmark\",\n",
    ")\n",
    "\n",
    "# Resolve parameter using Athena execution parameters (same effect as above)\n",
    "wr.athena.read_sql_query(\n",
    "    sql=\"EXECUTE statement USING ?\",\n",
    "    database=\"awswrangler_test\",\n",
    "    params=[\"E\"],\n",
    "    paramstyle=\"qmark\",\n",
    ")\n",
    "\n",
    "# Resolve parameter using client-side formatter\n",
    "wr.athena.read_sql_query(\n",
    "    sql=\"EXECUTE statement USING :flag_value\",\n",
    "    database=\"awswrangler_test\",\n",
    "    params={\n",
    "        \"flag_value\": \"E\",\n",
    "    },\n",
    "    paramstyle=\"named\",\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Clean up prepared statement\n",
    "wr.athena.delete_prepared_statement(statement_name=\"statement\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Cleaning Up S3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "is_executing": true
    }
   },
   "outputs": [],
   "source": [
    "wr.s3.delete_objects(path)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Delete table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "is_executing": true
    }
   },
   "outputs": [],
   "source": [
    "wr.catalog.delete_table_if_exists(database=\"awswrangler_test\", table=\"noaa\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Delete Database"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "is_executing": true
    }
   },
   "outputs": [],
   "source": [
    "wr.catalog.delete_database(\"awswrangler_test\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3.9.14",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
